/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zookeeper.server.quorum;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;

import java.util.concurrent.TimeUnit;
import java.util.Random;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.server.quorum.Election;
import org.apache.zookeeper.server.quorum.Vote;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;

/**
 * @deprecated This class has been deprecated as of release 3.4.0.
 */
@Deprecated
public class AuthFastLeaderElection implements Election {
	private class Messenger {

		class WorkerReceiver implements Runnable {

			Messenger myMsg;
			DatagramSocket mySocket;

			WorkerReceiver(DatagramSocket s, Messenger msg) {
				mySocket = s;
				myMsg = msg;
			}

			public void run() {
				byte responseBytes[] = new byte[48];
				ByteBuffer responseBuffer = ByteBuffer.wrap(responseBytes);
				DatagramPacket responsePacket = new DatagramPacket(responseBytes, responseBytes.length);
				while (true) {
					// Sleeps on receive
					try {
						responseBuffer.clear();
						mySocket.receive(responsePacket);
					} catch (IOException e) {
						LOG.warn("Ignoring exception receiving", e);
					}
					// Receive new message
					if (responsePacket.getLength() != responseBytes.length) {
						LOG.warn("Got a short response: " + responsePacket.getLength() + " "
								+ responsePacket.toString());
						continue;
					}
					responseBuffer.clear();
					int type = responseBuffer.getInt();
					if ((type > 3) || (type < 0)) {
						LOG.warn("Got bad Msg type: " + type);
						continue;
					}
					long tag = responseBuffer.getLong();

					QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
					switch (responseBuffer.getInt()) {
					case 0:
						ackstate = QuorumPeer.ServerState.LOOKING;
						break;
					case 1:
						ackstate = QuorumPeer.ServerState.LEADING;
						break;
					case 2:
						ackstate = QuorumPeer.ServerState.FOLLOWING;
						break;
					}

					Vote current = self.getCurrentVote();

					switch (type) {
					case 0:
						// Receive challenge request
						ToSend c = new ToSend(ToSend.mType.challenge, tag, current.getId(), current.getZxid(),
								logicalclock, self.getPeerState(),
								(InetSocketAddress) responsePacket.getSocketAddress());
						sendqueue.offer(c);
						break;
					case 1:
						// Receive challenge and store somewhere else
						long challenge = responseBuffer.getLong();
						saveChallenge(tag, challenge);

						break;
					case 2:
						Notification n = new Notification();
						n.leader = responseBuffer.getLong();
						n.zxid = responseBuffer.getLong();
						n.epoch = responseBuffer.getLong();
						n.state = ackstate;
						n.addr = (InetSocketAddress) responsePacket.getSocketAddress();

						if ((myMsg.lastEpoch <= n.epoch) && ((n.zxid > myMsg.lastProposedZxid)
								|| ((n.zxid == myMsg.lastProposedZxid) && (n.leader > myMsg.lastProposedLeader)))) {
							myMsg.lastProposedZxid = n.zxid;
							myMsg.lastProposedLeader = n.leader;
							myMsg.lastEpoch = n.epoch;
						}

						long recChallenge;
						InetSocketAddress addr = (InetSocketAddress) responsePacket.getSocketAddress();
						if (authEnabled) {
							ConcurrentHashMap<Long, Long> tmpMap = addrChallengeMap.get(addr);
							if (tmpMap != null) {
								if (tmpMap.get(tag) != null) {
									recChallenge = responseBuffer.getLong();

									if (tmpMap.get(tag) == recChallenge) {
										recvqueue.offer(n);

										ToSend a = new ToSend(ToSend.mType.ack, tag, current.getId(), current.getZxid(),
												logicalclock, self.getPeerState(), addr);

										sendqueue.offer(a);
									} else {
										LOG.warn("Incorrect challenge: " + recChallenge + ", "
												+ addrChallengeMap.toString());
									}
								} else {
									LOG.warn("No challenge for host: " + addr + " " + tag);
								}
							}
						} else {
							recvqueue.offer(n);

							ToSend a = new ToSend(ToSend.mType.ack, tag, current.getId(), current.getZxid(),
									logicalclock, self.getPeerState(),
									(InetSocketAddress) responsePacket.getSocketAddress());

							sendqueue.offer(a);
						}
						break;

					// Upon reception of an ack message, remove it from the
					// queue
					case 3:
						Semaphore s = ackMutex.get(tag);

						if (s != null)
							s.release();
						else
							LOG.error("Empty ack semaphore");

						ackset.add(tag);

						if (authEnabled) {
							ConcurrentHashMap<Long, Long> tmpMap = addrChallengeMap
									.get(responsePacket.getSocketAddress());
							if (tmpMap != null) {
								tmpMap.remove(tag);
							} else {
								LOG.warn("No such address in the ensemble configuration "
										+ responsePacket.getSocketAddress());
							}
						}

						if (ackstate != QuorumPeer.ServerState.LOOKING) {
							Notification outofsync = new Notification();
							outofsync.leader = responseBuffer.getLong();
							outofsync.zxid = responseBuffer.getLong();
							outofsync.epoch = responseBuffer.getLong();
							outofsync.state = ackstate;
							outofsync.addr = (InetSocketAddress) responsePacket.getSocketAddress();

							recvqueue.offer(outofsync);
						}

						break;
					// Default case
					default:
						LOG.warn("Received message of incorrect type " + type);
						break;
					}
				}
			}

			boolean saveChallenge(long tag, long challenge) {
				Semaphore s = challengeMutex.get(tag);
				if (s != null) {
					synchronized (Messenger.this) {
						challengeMap.put(tag, challenge);
						challengeMutex.remove(tag);
					}

					s.release();
				} else {
					LOG.error("No challenge mutex object");
				}

				return true;
			}
		}

		class WorkerSender implements Runnable {

			int ackWait = finalizeWait;
			int maxAttempts;
			Random rand;

			/*
			 * Receives a socket and max number of attempts as input
			 */

			WorkerSender(int attempts) {
				maxAttempts = attempts;
				rand = new Random(java.lang.Thread.currentThread().getId() + System.currentTimeMillis());
			}

			long genChallenge() {
				byte buf[] = new byte[8];

				buf[0] = (byte) ((challengeCounter & 0xff000000) >>> 24);
				buf[1] = (byte) ((challengeCounter & 0x00ff0000) >>> 16);
				buf[2] = (byte) ((challengeCounter & 0x0000ff00) >>> 8);
				buf[3] = (byte) ((challengeCounter & 0x000000ff));

				challengeCounter++;
				int secret = rand.nextInt(java.lang.Integer.MAX_VALUE);

				buf[4] = (byte) ((secret & 0xff000000) >>> 24);
				buf[5] = (byte) ((secret & 0x00ff0000) >>> 16);
				buf[6] = (byte) ((secret & 0x0000ff00) >>> 8);
				buf[7] = (byte) ((secret & 0x000000ff));

				return (((long) (buf[0] & 0xFF)) << 56) + (((long) (buf[1] & 0xFF)) << 48)
						+ (((long) (buf[2] & 0xFF)) << 40) + (((long) (buf[3] & 0xFF)) << 32)
						+ (((long) (buf[4] & 0xFF)) << 24) + (((long) (buf[5] & 0xFF)) << 16)
						+ (((long) (buf[6] & 0xFF)) << 8) + ((long) (buf[7] & 0xFF));
			}

			private void process(ToSend m) {
				int attempts = 0;
				byte zeroes[];
				byte requestBytes[] = new byte[48];
				DatagramPacket requestPacket = new DatagramPacket(requestBytes, requestBytes.length);
				ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);

				switch (m.type) {
				case 0:
					/*
					 * Building challenge request packet to send
					 */
					requestBuffer.clear();
					requestBuffer.putInt(ToSend.mType.crequest.ordinal());
					requestBuffer.putLong(m.tag);
					requestBuffer.putInt(m.state.ordinal());
					zeroes = new byte[32];
					requestBuffer.put(zeroes);

					requestPacket.setLength(48);
					try {
						requestPacket.setSocketAddress(m.addr);
					} catch (IllegalArgumentException e) {
						// Sun doesn't include the address that causes this
						// exception to be thrown, so we wrap the exception
						// in order to capture this critical detail.
						throw new IllegalArgumentException("Unable to set socket address on packet, msg:"
								+ e.getMessage() + " with addr:" + m.addr, e);
					}

					try {
						if (challengeMap.get(m.tag) == null) {
							mySocket.send(requestPacket);
						}
					} catch (IOException e) {
						LOG.warn("Exception while sending challenge: ", e);
					}

					break;
				case 1:
					/*
					 * Building challenge packet to send
					 */

					long newChallenge;
					ConcurrentHashMap<Long, Long> tmpMap = addrChallengeMap.get(m.addr);
					if (tmpMap != null) {
						Long tmpLong = tmpMap.get(m.tag);
						if (tmpLong != null) {
							newChallenge = tmpLong;
						} else {
							newChallenge = genChallenge();
						}

						tmpMap.put(m.tag, newChallenge);

						requestBuffer.clear();
						requestBuffer.putInt(ToSend.mType.challenge.ordinal());
						requestBuffer.putLong(m.tag);
						requestBuffer.putInt(m.state.ordinal());
						requestBuffer.putLong(newChallenge);
						zeroes = new byte[24];
						requestBuffer.put(zeroes);

						requestPacket.setLength(48);
						try {
							requestPacket.setSocketAddress(m.addr);
						} catch (IllegalArgumentException e) {
							// Sun doesn't include the address that causes this
							// exception to be thrown, so we wrap the exception
							// in order to capture this critical detail.
							throw new IllegalArgumentException("Unable to set socket address on packet, msg:"
									+ e.getMessage() + " with addr:" + m.addr, e);
						}

						try {
							mySocket.send(requestPacket);
						} catch (IOException e) {
							LOG.warn("Exception while sending challenge: ", e);
						}
					} else {
						LOG.error("Address is not in the configuration: " + m.addr);
					}

					break;
				case 2:

					/*
					 * Building notification packet to send
					 */

					requestBuffer.clear();
					requestBuffer.putInt(m.type);
					requestBuffer.putLong(m.tag);
					requestBuffer.putInt(m.state.ordinal());
					requestBuffer.putLong(m.leader);
					requestBuffer.putLong(m.zxid);
					requestBuffer.putLong(m.epoch);
					zeroes = new byte[8];
					requestBuffer.put(zeroes);

					requestPacket.setLength(48);
					try {
						requestPacket.setSocketAddress(m.addr);
					} catch (IllegalArgumentException e) {
						// Sun doesn't include the address that causes this
						// exception to be thrown, so we wrap the exception
						// in order to capture this critical detail.
						throw new IllegalArgumentException("Unable to set socket address on packet, msg:"
								+ e.getMessage() + " with addr:" + m.addr, e);
					}

					boolean myChallenge = false;
					boolean myAck = false;

					while (attempts < maxAttempts) {
						try {
							/*
							 * Try to obtain a challenge only if does not have one yet
							 */

							if (!myChallenge && authEnabled) {
								ToSend crequest = new ToSend(ToSend.mType.crequest, m.tag, m.leader, m.zxid, m.epoch,
										QuorumPeer.ServerState.LOOKING, m.addr);
								sendqueue.offer(crequest);

								try {
									double timeout = ackWait * java.lang.Math.pow(2, attempts);

									Semaphore s = new Semaphore(0);
									synchronized (Messenger.this) {
										challengeMutex.put(m.tag, s);
										s.tryAcquire((long) timeout, TimeUnit.MILLISECONDS);
										myChallenge = challengeMap.containsKey(m.tag);
									}
								} catch (InterruptedException e) {
									LOG.warn("Challenge request exception: ", e);
								}
							}

							/*
							 * If don't have challenge yet, skip sending notification
							 */

							if (authEnabled && !myChallenge) {
								attempts++;
								continue;
							}

							if (authEnabled) {
								requestBuffer.position(40);
								Long tmpLong = challengeMap.get(m.tag);
								if (tmpLong != null) {
									requestBuffer.putLong(tmpLong);
								} else {
									LOG.warn("No challenge with tag: " + m.tag);
								}
							}
							mySocket.send(requestPacket);
							try {
								Semaphore s = new Semaphore(0);
								double timeout = ackWait * java.lang.Math.pow(10, attempts);
								ackMutex.put(m.tag, s);
								s.tryAcquire((int) timeout, TimeUnit.MILLISECONDS);
							} catch (InterruptedException e) {
								LOG.warn("Ack exception: ", e);
							}

							if (ackset.remove(m.tag)) {
								myAck = true;
							}

						} catch (IOException e) {
							LOG.warn("Sending exception: ", e);
							/*
							 * Do nothing, just try again
							 */
						}
						if (myAck) {
							/*
							 * Received ack successfully, so return
							 */
							challengeMap.remove(m.tag);

							return;
						} else
							attempts++;
					}
					/*
					 * Return message to queue for another attempt later if epoch hasn't changed.
					 */
					if (m.epoch == logicalclock) {
						challengeMap.remove(m.tag);
						sendqueue.offer(m);
					}
					break;
				case 3:

					requestBuffer.clear();
					requestBuffer.putInt(m.type);
					requestBuffer.putLong(m.tag);
					requestBuffer.putInt(m.state.ordinal());
					requestBuffer.putLong(m.leader);
					requestBuffer.putLong(m.zxid);
					requestBuffer.putLong(m.epoch);

					requestPacket.setLength(48);
					try {
						requestPacket.setSocketAddress(m.addr);
					} catch (IllegalArgumentException e) {
						// Sun doesn't include the address that causes this
						// exception to be thrown, so we wrap the exception
						// in order to capture this critical detail.
						throw new IllegalArgumentException("Unable to set socket address on packet, msg:"
								+ e.getMessage() + " with addr:" + m.addr, e);
					}

					try {
						mySocket.send(requestPacket);
					} catch (IOException e) {
						LOG.warn("Exception while sending ack: ", e);
					}
					break;
				}
			}

			public void run() {
				while (true) {
					try {
						ToSend m = sendqueue.take();
						process(m);
					} catch (InterruptedException e) {
						break;
					}

				}
			}
		}

		final ConcurrentHashMap<Long, Semaphore> ackMutex;
		final Set<Long> ackset;
		final ConcurrentHashMap<InetSocketAddress, ConcurrentHashMap<Long, Long>> addrChallengeMap;
		final ConcurrentHashMap<Long, Long> challengeMap;
		final ConcurrentHashMap<Long, Semaphore> challengeMutex;
		long lastEpoch;
		long lastProposedLeader;

		long lastProposedZxid;

		final DatagramSocket mySocket;

		Messenger(int threads, DatagramSocket s) {
			mySocket = s;
			ackset = Collections.<Long>newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
			challengeMap = new ConcurrentHashMap<Long, Long>();
			challengeMutex = new ConcurrentHashMap<Long, Semaphore>();
			ackMutex = new ConcurrentHashMap<Long, Semaphore>();
			addrChallengeMap = new ConcurrentHashMap<InetSocketAddress, ConcurrentHashMap<Long, Long>>();
			lastProposedLeader = 0;
			lastProposedZxid = 0;
			lastEpoch = 0;

			for (int i = 0; i < threads; ++i) {
				Thread t = new Thread(new WorkerSender(3), "WorkerSender Thread: " + (i + 1));
				t.setDaemon(true);
				t.start();
			}

			for (QuorumServer server : self.getVotingView().values()) {
				InetSocketAddress saddr = new InetSocketAddress(server.addr.getAddress(), port);
				addrChallengeMap.put(saddr, new ConcurrentHashMap<Long, Long>());
			}

			Thread t = new Thread(new WorkerReceiver(s, this), "WorkerReceiver Thread");
			t.start();
		}

		public boolean queueEmpty() {
			return (sendqueue.isEmpty() || ackset.isEmpty() || recvqueue.isEmpty());
		}

	}

	static public class Notification {
		/*
		 * Address of the sender
		 */
		InetSocketAddress addr;

		/*
		 * Epoch
		 */
		long epoch;

		/*
		 * Proposed leader
		 */
		long leader;

		/*
		 * current state of sender
		 */
		QuorumPeer.ServerState state;

		/*
		 * zxid of the proposed leader
		 */
		long zxid;
	}

	/*
	 * Messages to send, both Notifications and Acks
	 */
	static public class ToSend {
		static enum mType {
			ack, challenge, crequest, notification
		}

		InetSocketAddress addr;

		/*
		 * Epoch
		 */
		long epoch;

		/*
		 * Proposed leader in the case of notification
		 */
		long leader;

		/*
		 * Current state;
		 */
		QuorumPeer.ServerState state;

		/*
		 * Message tag
		 */
		long tag;

		/*
		 * Message type: 0 notification, 1 acknowledgement
		 */
		int type;

		/*
		 * id contains the tag for acks, and zxid for notifications
		 */
		long zxid;

		ToSend(mType type, long tag, long leader, long zxid, long epoch, ServerState state, InetSocketAddress addr) {

			switch (type) {
			case crequest:
				this.type = 0;
				this.tag = tag;
				this.leader = leader;
				this.zxid = zxid;
				this.epoch = epoch;
				this.state = state;
				this.addr = addr;

				break;
			case challenge:
				this.type = 1;
				this.tag = tag;
				this.leader = leader;
				this.zxid = zxid;
				this.epoch = epoch;
				this.state = state;
				this.addr = addr;

				break;
			case notification:
				this.type = 2;
				this.leader = leader;
				this.zxid = zxid;
				this.epoch = epoch;
				this.state = QuorumPeer.ServerState.LOOKING;
				this.tag = tag;
				this.addr = addr;

				break;
			case ack:
				this.type = 3;
				this.tag = tag;
				this.leader = leader;
				this.zxid = zxid;
				this.epoch = epoch;
				this.state = state;
				this.addr = addr;

				break;
			default:
				break;
			}
		}
	}

	static int challengeCounter = 0;

	/*
	 * Challenge counter to avoid replay attacks
	 */

	/*
	 * Determine how much time a process has to wait once it believes that it has
	 * reached the end of leader election.
	 */
	static int finalizeWait = 100;

	/*
	 * Flag to determine whether to authenticate or not
	 */

	private static final Logger LOG = LoggerFactory.getLogger(AuthFastLeaderElection.class);

	static int maxTag = 0;

	/* Sequence numbers for messages */
	static int sequencer = 0;

	private boolean authEnabled = false;

	volatile long logicalclock; /* Election instance */

	DatagramSocket mySocket;

	int port;
	long proposedLeader;
	long proposedZxid;
	LinkedBlockingQueue<Notification> recvqueue;
	QuorumPeer self;
	LinkedBlockingQueue<ToSend> sendqueue;

	public AuthFastLeaderElection(QuorumPeer self) {
		starter(self);
	}

	public AuthFastLeaderElection(QuorumPeer self, boolean auth) {
		this.authEnabled = auth;
		starter(self);
	}

	private void leaveInstance() {
		logicalclock++;
	}

	/**
	 * Invoked in QuorumPeer to find or elect a new leader.
	 * 
	 * @throws InterruptedException
	 */
	public Vote lookForLeader() throws InterruptedException {
		try {
			self.jmxLeaderElectionBean = new LeaderElectionBean();
			MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
		} catch (Exception e) {
			LOG.warn("Failed to register with JMX", e);
			self.jmxLeaderElectionBean = null;
		}

		try {
			HashMap<InetSocketAddress, Vote> recvset = new HashMap<InetSocketAddress, Vote>();

			HashMap<InetSocketAddress, Vote> outofelection = new HashMap<InetSocketAddress, Vote>();

			logicalclock++;

			proposedLeader = self.getId();
			proposedZxid = self.getLastLoggedZxid();

			LOG.info("Election tally");
			sendNotifications();

			/*
			 * Loop in which we exchange notifications until we find a leader
			 */

			while (self.getPeerState() == ServerState.LOOKING) {
				/*
				 * Remove next notification from queue, times out after 2 times the termination
				 * time
				 */
				Notification n = recvqueue.poll(2 * finalizeWait, TimeUnit.MILLISECONDS);

				/*
				 * Sends more notifications if haven't received enough. Otherwise processes new
				 * notification.
				 */
				if (n == null) {
					if (((!outofelection.isEmpty()) || (recvset.size() > 1)))
						sendNotifications();
				} else
					switch (n.state) {
					case LOOKING:
						if (n.epoch > logicalclock) {
							logicalclock = n.epoch;
							recvset.clear();
							if (totalOrderPredicate(n.leader, n.zxid)) {
								proposedLeader = n.leader;
								proposedZxid = n.zxid;
							}
							sendNotifications();
						} else if (n.epoch < logicalclock) {
							break;
						} else if (totalOrderPredicate(n.leader, n.zxid)) {
							proposedLeader = n.leader;
							proposedZxid = n.zxid;

							sendNotifications();
						}

						recvset.put(n.addr, new Vote(n.leader, n.zxid));

						// If have received from all nodes, then terminate
						if (self.getVotingView().size() == recvset.size()) {
							self.setPeerState(
									(proposedLeader == self.getId()) ? ServerState.LEADING : ServerState.FOLLOWING);
							// if (self.state == ServerState.FOLLOWING) {
							// Thread.sleep(100);
							// }
							leaveInstance();
							return new Vote(proposedLeader, proposedZxid);

						} else if (termPredicate(recvset, proposedLeader, proposedZxid)) {
							// Otherwise, wait for a fixed amount of time
							LOG.info("Passed predicate");
							Thread.sleep(finalizeWait);

							// Notification probe = recvqueue.peek();

							// Verify if there is any change in the proposed leader
							while ((!recvqueue.isEmpty())
									&& !totalOrderPredicate(recvqueue.peek().leader, recvqueue.peek().zxid)) {
								recvqueue.poll();
							}
							if (recvqueue.isEmpty()) {
								// LOG.warn("Proposed leader: " +
								// proposedLeader);
								self.setPeerState(
										(proposedLeader == self.getId()) ? ServerState.LEADING : ServerState.FOLLOWING);

								leaveInstance();
								return new Vote(proposedLeader, proposedZxid);
							}
						}
						break;
					case LEADING:
						outofelection.put(n.addr, new Vote(n.leader, n.zxid));

						if (termPredicate(outofelection, n.leader, n.zxid)) {

							self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : ServerState.FOLLOWING);

							leaveInstance();
							return new Vote(n.leader, n.zxid);
						}
						break;
					case FOLLOWING:
						outofelection.put(n.addr, new Vote(n.leader, n.zxid));

						if (termPredicate(outofelection, n.leader, n.zxid)) {

							self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING : ServerState.FOLLOWING);

							leaveInstance();
							return new Vote(n.leader, n.zxid);
						}
						break;
					default:
						break;
					}
			}

			return null;
		} finally {
			try {
				if (self.jmxLeaderElectionBean != null) {
					MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
				}
			} catch (Exception e) {
				LOG.warn("Failed to unregister with JMX", e);
			}
			self.jmxLeaderElectionBean = null;
		}
	}

	private void sendNotifications() {
		for (QuorumServer server : self.getView().values()) {

			ToSend notmsg = new ToSend(ToSend.mType.notification, AuthFastLeaderElection.sequencer++, proposedLeader,
					proposedZxid, logicalclock, QuorumPeer.ServerState.LOOKING,
					self.getView().get(server.id).electionAddr);

			sendqueue.offer(notmsg);
		}
	}

	/**
	 * There is nothing to shutdown in this implementation of leader election, so we
	 * simply have an empty method.
	 */
	public void shutdown() {
	}

	private void starter(QuorumPeer self) {
		this.self = self;
		port = self.getVotingView().get(self.getId()).electionAddr.getPort();
		proposedLeader = -1;
		proposedZxid = -1;

		try {
			mySocket = new DatagramSocket(port);
			// mySocket.setSoTimeout(20000);
		} catch (SocketException e1) {
			e1.printStackTrace();
			throw new RuntimeException();
		}
		sendqueue = new LinkedBlockingQueue<ToSend>(2 * self.getVotingView().size());
		recvqueue = new LinkedBlockingQueue<Notification>(2 * self.getVotingView().size());
		new Messenger(self.getVotingView().size() * 2, mySocket);
	}

	private boolean termPredicate(HashMap<InetSocketAddress, Vote> votes, long l, long zxid) {

		Collection<Vote> votesCast = votes.values();
		int count = 0;
		/*
		 * First make the views consistent. Sometimes peers will have different zxids
		 * for a server depending on timing.
		 */
		for (Vote v : votesCast) {
			if ((v.getId() == l) && (v.getZxid() == zxid))
				count++;
		}

		if (count > (self.getVotingView().size() / 2))
			return true;
		else
			return false;

	}

	private boolean totalOrderPredicate(long id, long zxid) {
		if ((zxid > proposedZxid) || ((zxid == proposedZxid) && (id > proposedLeader)))
			return true;
		else
			return false;

	}
}
